package org.snake.nebulae.core.model;

import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.snake.nebulae.core.exception.ResponseException;
import org.snake.nebulae.core.task.CoreTask;

import java.util.concurrent.CompletableFuture;

@Data
@EqualsAndHashCode(callSuper = false)
@NoArgsConstructor
@Slf4j
public class ServiceHandleAdapter extends ServiceHandle {

    /**
     * mqtt连接客户端
     */
    protected MqttAsyncClient mqttAsyncClient;

    /**
     * 服务信息
     */
    protected ServiceInfo serviceInfo;

    /**
     * json
     */
    protected ObjectMapper objectMapper;

    public ServiceHandleAdapter(MqttAsyncClient mqttAsyncClient, ServiceInfo serviceInfo, ObjectMapper objectMapper) {
        this.mqttAsyncClient = mqttAsyncClient;
        this.serviceInfo = serviceInfo;
        this.objectMapper = objectMapper;

        doInit();
    }

    /**
     * 返回处理过程出现错误
     */
    protected CompletableFuture<Exception> doReturnError = new CompletableFuture<>();

    /**
     * 返回处理过程完成
     */
    protected CompletableFuture<Object> doReturnSuccess = new CompletableFuture<>();

    /**
     * 设置初始化
     */
    @Override
    public void doInit() {

    }

    /**
     * 检查参数有效性
     *
     * @return 布尔
     */
    @Override
    public boolean doCheckParams(RequestInfo requestInfo) {
        return true;
    }

    /**
     * 主逻辑
     * 构建一个返回信息
     *
     * @param requestInfo 请求信息
     * @return 结果信息
     */
    @Override
    public Object doLogic(RequestInfo requestInfo) {
        return null;
    }

    /**
     * 结果信息
     *
     * @param result 结果
     */
    @Override
    public void doReturn(Object result) {
        try {
            ResponseInfo r = ((ResponseInfo) result);
            // 方法地址
            String methodPath = serviceInfo.getServicePath() + "/" + methodName;
            // 返回结果地址
            String responsePath = methodPath + CoreTask.topicResponseExt + "/" + r.getRequestInfo().getRequestId();

            String data = objectMapper.writeValueAsString(result);

            IMqttActionListener returnAction = new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    log.info("{} 结果返回成功 {} 时间是 {}", this.getClass(), data, DateUtil.now());
                    doReturnSuccess.complete(result);
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    log.error("{} 结果返回失败 {} 时间是 {} 原因是 {}", this.getClass(), data, DateUtil.now(), throwable.getMessage());
                    doReturnError.complete(new ResponseException(throwable.getMessage()));
                }
            };
            mqttAsyncClient.publish(responsePath, data.getBytes(), 2, false, this, returnAction);
        } catch (JsonProcessingException | MqttException e) {
            doReturnError.complete(e);
        }
    }
}
